Custering Model Evaluation

For reference see Scikit Learn's clustering and model evaluation


In [41]:
import org.apache.spark.sql.DataFrame

/** Calculates the homogeneity custer scoring 
 *
 *  @param df a Spark DataFrame of the input data
 *  @param truth a String column name of a column in the df where the true class is stored
 *  @param pred a String column name of a column in the df where the cluster id is stored
 */
def homogeneity_score(df: DataFrame, truth: String, pred: String): Double = {
  import org.apache.spark.sql.functions.{sum, udf, lit, count}
    
  def log2 = (x: Double) => scala.math.log10(x)/scala.math.log10(2.0)

  def entropy(count: Int, n: Long, n_k: Long): Double = {
    -(count.toDouble / n) * log2(count.toDouble / n_k)
  }
  val udf_entropy = udf(entropy _)

  val n = df.count().toLong
  val classes = df.groupBy(truth).count()
  val clusters = df.groupBy(pred).count().toDF(pred, "count_k")
  // number of class c assigned to cluster k
  val n_ck = df.groupBy(truth,pred).count()

  val entropy_of_classes = (classes.withColumn("entropy", 
                                               udf_entropy(classes("count"),
                                                           lit(n), 
                                                           lit(n)))
                                   .agg(sum("entropy"))
                                   .first()
                                   .getDouble(0))
  
  val joined_df = n_ck.as("n_ck").join(clusters, pred)
  val conditional_entropy = (joined_df.withColumn("c_entropy", 
                                                  udf_entropy(joined_df("count"), 
                                                                lit(n), 
                                                                joined_df("count_k")))
                                      .agg(sum("c_entropy"))
                                      .first()
                                      .getDouble(0))
    
  1 - conditional_entropy.toDouble / entropy_of_classes
}

/** Calculates the completeness custer scoring 
 *
 *  @param df a Spark DataFrame of the input data
 *  @param truth a String column name of a column in the df where the true class is stored
 *  @param pred a String column name of a column in the df where the cluster id is stored
 *  @param two_plus Boolean to state whether or not to filter for only two or greater count in the truth values
 */
def completeness_score(df: DataFrame, truth: String, pred: String, two_plus: Boolean = false): Double = {
  import org.apache.spark.sql.functions.{sum, udf, lit, count}
  var filtered_df = df
  if (two_plus){
    println("filtering for 2+")
    filtered_df = (df.groupBy(truth)
                     .agg(count(lit(1)).alias("count"))
                     .as("df1")
                     .join(df.as("df2"), truth)
                     .filter("count > 1").select(truth, pred))
  }
  homogeneity_score(filtered_df, pred, truth)
}

/** Calculates the harmonic mean / v measurement of the custer scoring 
 *
 *  @param df a Spark DataFrame of the input data
 *  @param truth a String column name of a column in the df where the true class is stored
 *  @param pred a String column name of a column in the df where the cluster id is stored
 */
def v_measurement_score(df: DataFrame, truth: String, pred: String): Double = {
  val h = homogeneity_score(df, truth, pred)
  val c = completeness_score(df, truth, pred)
  2 * h * c / (h + c)
}

Testing

Create Data


In [16]:
import org.apache.spark.sql.Row

// Dataset 1
case class jz_row(truth: String, pred: String)
val table = Seq(jz_row("0", "a"),jz_row("0", "a"),jz_row("0", "a"),jz_row("0", "b"),jz_row("1", "b"),jz_row("1", "c"),jz_row("1", "c"),jz_row("2","d"))
var df = spark.createDataFrame(table)

In [17]:
// Dataset 2
val schema = df.schema
val labels_true = List("0,0,0,1,1,1,3,3,3,5,5,5,5,5,5,5,5").flatMap(_.split(","))
val labels_pred = List("0,1,1,1,1,1,3,3,3,5,5,5,5,5,5,5,5").flatMap(_.split(","))
val rows = labels_true zip labels_pred
val rdd = sc.parallelize (rows).map(x => Row(x._1, x._2))
df = spark.sqlContext.createDataFrame(rdd, schema)

In [29]:
// Dataset 3
val schema = df.schema
val labels_true = List("0,0,0,1,1,1,3,3,3,5,5,5,5,5,5,5,5,6,7,8").flatMap(_.split(","))
val labels_pred = List("0,1,1,1,1,1,3,3,3,5,5,5,5,5,5,5,5,7,8,9").flatMap(_.split(","))

val rows = labels_true zip labels_pred
val rdd = sc.parallelize (rows).map(x => Row(x._1, x._2))
df = spark.sqlContext.createDataFrame(rdd, schema)

In [42]:
df.show()


+-----+----+
|truth|pred|
+-----+----+
|    0|   0|
|    0|   1|
|    0|   1|
|    1|   1|
|    1|   1|
|    1|   1|
|    3|   3|
|    3|   3|
|    3|   3|
|    5|   5|
|    5|   5|
|    5|   5|
|    5|   5|
|    5|   5|
|    5|   5|
|    5|   5|
|    5|   5|
|    6|   7|
|    7|   8|
|    8|   9|
+-----+----+


In [43]:
homogeneity_score(df, "truth", "pred")


Out[43]:
0.8992244133520476

In [44]:
completeness_score(df, "truth", "pred")


Out[44]:
0.940207373487672

In [45]:
completeness_score(df, "truth", "pred", true)


filtering for 2+
Out[45]:
0.9054029493697273

In [46]:
v_measurement_score(df, "truth", "pred")


Out[46]:
0.9192593385659383

In [ ]: